// During concurrent close() calls we want to make sure that all of them return after the node has completed it's shutdown cycle. // If not, the hook that is added in Bootstrap#setup() will be useless: // close() might not be executed, in case another (for example api) call to close() has already set some lifecycles to stopped. // In this case the process will be terminated even if the first call to close() has not finished yet. @Override public synchronized void close() throws IOException { synchronized (lifecycle) { if (lifecycle.started()) { stop(); } if (!lifecycle.moveToClosed()) { return; } }
toClose.add(() -> stopWatch.stop().start("thread_pool")); toClose.add(() -> injector.getInstance(ThreadPool.class).shutdown()); // Don't call shutdownNow here, it might break ongoing operations on Lucene indices. // See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in // awaitClose if the node doesn't finish closing within the specified time.
if (logger.isTraceEnabled()) { toClose.add(() -> logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint())); } IOUtils.close(toClose); logger.info("closed"); }
injector.getInstance(SnapshotsService.class).stop(); injector.getInstance(SnapshotShardsService.class).stop(); injector.getInstance(RepositoriesService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); // close discovery early to not react to pings anymore. // This can confuse other nodes and delay things - mostly if we're the master and we're running tests. injector.getInstance(Discovery.class).stop(); // we close indices first, so operations won't be allowed on it injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(FsHealthService.class).stop(); nodeService.getMonitorService().stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop();
pluginLifecycleComponents.forEach(LifecycleComponent::stop); // we should stop this last since it waits for resources to get released // if we had scroll searchers etc or recovery going on we wait for to finish. injector.getInstance(IndicesService.class).stop(); logger.info("stopped");
return this; }
各模块的关闭有一定的顺序关系,以 doStop 为例,按下表所示的 顺序调用各模块 doStop方法。
服务
简介
ResourceWatcherService
通用资源监视服务
HttpServerTransport
HTTP 传输服务,提供REST接口服务
SnapshotsService
快照服务
SnapshotShardsService
负责启动和停止shard级快照
RepositoriesService
Service responsible for maintaining and providing access to snapshot repositories on nodes
IndicesClusterStateService
收到集群状态信息后,处理其中索引相关操作
Discovery
集群拓扑管理
ClusterService
集群管理服务,主要处理集群任务,发布集群状态
NodeConnectionsService
节点连接管理服务
FsHealthService
Runs periodically and attempts to create a temp file to see if the filesystem is writable. If not then it marks the path as unhealthy
// Copy indices because we modify it asynchronously in the body of the loop final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet()); final CountDownLatch latch = new CountDownLatch(indices.size()); for (final Index index : indices) { indicesStopExecutor.execute(() -> { try { removeIndex(index, IndexRemovalReason.SHUTDOWN, "shutdown"); } finally { latch.countDown(); } }); } try { // 注意shardsClosedTimeout 这个值是在IndicesService的构造函数中初始化的 // this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); // 也就是说 CountDownLatch.await默认1天才会继续后面的流程 if (latch.await(shardsClosedTimeout.seconds(), TimeUnit.SECONDS) == false) { logger.warn("Not all shards are closed yet, waited {}sec - stopping service", shardsClosedTimeout.seconds()); } } catch (InterruptedException e) { // ignore } finally { indicesStopExecutor.shutdown(); } }
那什么时候会导致removeIndex执行一直无法返回呢?
1 2 3 4 5 6
IndicesService removeIndex(final Index index, final IndexRemovalReason reason, final String extraInfo)=> IndexService close(final String reason, boolean delete)=> IndexService removeShard(int shardId, String reason)=> IndexService removeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener)=> IndexShard close(String reason, boolean flushEngine)=> Engine flushAndClose()=>
/** * Flush the engine (committing segments to disk and truncating the * translog) and close it. */ public void flushAndClose() throws IOException { if (isClosed.get() == false) { logger.trace("flushAndClose now acquire writeLock"); // 可以看一下: // https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L857 // 由于写入操作已经加了读锁,此时写锁会等待,直到写入执行完毕。 // 因此数据写入过程不会被中断。但是由于网络模块被关闭,客户端的连接会被断开。 // 客户端应当作为失败处理,虽然ES服务端的写流程还在继续。 try (ReleasableLock lock = writeLock.acquire()) { logger.trace("flushAndClose now acquired writeLock"); try { logger.debug("flushing shard on close - this might take some time to sync files to disk"); try { // TODO we might force a flush in the future since we have the write lock already even though recoveries // are running. flush(); } catch (AlreadyClosedException ex) { logger.debug("engine already closed - skipping flushAndClose"); } } finally { close(); // double close is not a problem } } } awaitPendingClose(); }